{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "69c1fcd9",
   "metadata": {},
   "source": [
    "## Data Cleansing and Transformation in Python"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "b0ada6dd",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Import modules\n",
    "import pandas as pd \n",
    "import logging"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "bbf29475",
   "metadata": {},
   "outputs": [],
   "source": [
    "# define paths to data files\n",
    "crash_data_file = \"traffic_crashes.csv\" \n",
    "vehicle_crash_data_file = \"traffic_crash_vehicle.csv\" "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "ef51f9ed",
   "metadata": {},
   "outputs": [],
   "source": [
    "# import data as dataframes\n",
    "df_crashes = pd.read_csv(f\"data/{crash_data_file}\") \n",
    "df_vehicles= pd.read_csv(f\"data/{vehicle_crash_data_file}\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "59e7f723",
   "metadata": {},
   "source": [
    "### Preliminary Tasks: The Importance of Staging Data  "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "82a14b3a",
   "metadata": {},
   "outputs": [],
   "source": [
    "df_crashes.head()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "a90f8100",
   "metadata": {},
   "outputs": [],
   "source": [
    "df_crashes.info()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "ad6a4934",
   "metadata": {},
   "outputs": [],
   "source": [
    "df_crashes.isnull().sum()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "4c861506",
   "metadata": {},
   "outputs": [],
   "source": [
    "df_crashes.dropna(axis='columns', how='all', inplace=True) "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "a36884d3",
   "metadata": {},
   "outputs": [],
   "source": [
    "df_crashes = df_crashes.dropna(axis='index', thresh=2, inplace=False) "
   ]
  },
  {
   "cell_type": "markdown",
   "id": "e15dd945",
   "metadata": {},
   "source": [
    "#### Working with Missing Data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "ed58b5d0",
   "metadata": {},
   "outputs": [],
   "source": [
    "# This column has only two values.  \n",
    "df_crashes['report_type'].unique()  # ['ON SCENE', 'NOT ON SCENE (DESK REPORT)'] "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "749f8db8",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Let’s fill the missing value with ‘ON SCENE’ as below -  \n",
    "df_crashes  = df_crashes.fillna(value={'report_type': 'ON SCENE'}) "
   ]
  },
  {
   "cell_type": "markdown",
   "id": "04a8a3fb",
   "metadata": {},
   "source": [
    "#### Merging Data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "abaf4ed8",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Merge crashes and vehicles dataframes\n",
    "df = df_crashes.merge(df_vehicles, how = 'left',on='crash_record_id',suffixes=('_left', '_right')) \n",
    "print(df.shape)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "27e94752",
   "metadata": {},
   "outputs": [],
   "source": [
    "df.head()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "205afca9",
   "metadata": {},
   "outputs": [],
   "source": [
    "df_agg = df.groupby('vehicle_type').agg({'crash_record_id': 'count'}).reset_index()\n",
    "df_agg"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "ccd11db7",
   "metadata": {},
   "outputs": [],
   "source": [
    "number_of_passenger_cars_involved = df_agg[df_agg['vehicle_type'] == 'PASSENGER']['crash_record_id'].array[0] \n",
    "number_of_passenger_cars_involved"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "ddafcf9c",
   "metadata": {},
   "source": [
    "#### Data Mapping with Target Data "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "bb27bd63",
   "metadata": {},
   "outputs": [],
   "source": [
    "# rename columns for data output\n",
    "vehicle_mapping = {'vehicle_type':'vehicletypes'}  \n",
    "df_agg = df_agg.rename(columns=vehicle_mapping)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "7b052444",
   "metadata": {},
   "outputs": [],
   "source": [
    "df_agg"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "f074851e",
   "metadata": {},
   "source": [
    "### Writing Transformation Functions"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "73a246c6",
   "metadata": {},
   "outputs": [],
   "source": [
    "def get_transformed_data(crash_file, vehicle_file): \n",
    "\n",
    "    # import data\n",
    "    df_crashes = pd.read_csv(f\"data/{crash_file}\") \n",
    "    df_vehicles= pd.read_csv(f\"data/{vehicle_file}\") \n",
    "    \n",
    "    # remove specified missing values\n",
    "    under_threshold_removed = df_crashes.dropna(axis='index', thresh=2, inplace=False) \n",
    "    under_threshold_rows = df_crashes[~df_crashes.index.isin(under_threshold_removed.index)] \n",
    "    df_crashes.fillna(value={'report_type': 'ON SCENE'}, inplace=True) \n",
    "    \n",
    "    # merge crashes and vehicles\n",
    "    df = df_crashes.merge(df_vehicles, how='left', on='crash_record_id', suffixes=('_left','_right')) \n",
    "    df_agg = df.groupby('vehicle_type').agg({'crash_record_id': 'count'}).reset_index() \n",
    "    \n",
    "    # transform column names for output data\n",
    "    vehicle_mapping = {'vehicle_type':'vehicletypes'}  \n",
    "    df_agg = df_agg.rename(columns=vehicle_mapping)\n",
    "\n",
    "    return df_agg"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "9a5dee8c",
   "metadata": {},
   "outputs": [],
   "source": [
    "get_transformed_data(crash_data_file,vehicle_crash_data_file) "
   ]
  },
  {
   "cell_type": "markdown",
   "id": "b37d26c6",
   "metadata": {},
   "source": [
    "### Running the Workflow"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "99898696",
   "metadata": {},
   "source": [
    "#### The preceding code can be split into reusable functions that are easy to manage"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "e9656323",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Read data from data source  \n",
    "def read_datasources(source_name): \n",
    "    df = pd.read_csv(f\"data/{source_name}\")  \n",
    "    return df"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "6cc9ac0c",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Drop rows with null values \n",
    "def drop_rows_with_null_values(df): \n",
    "    under_threshold_removed = df.dropna(axis='index', thresh=2, inplace=False)  \n",
    "    df = df[~df.index.isin(under_threshold_removed.index)]  \n",
    "    return df "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "49f4dbf7",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Fill missing values \n",
    "def fill_missing_values(df): \n",
    "    df = df.fillna(value={'report_type': 'ON SCENE'})  \n",
    "    return df"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "c401d9cc",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Merge Dataframes \n",
    "def merge_dataframes(df_vehicles,df_crashes): \n",
    "    df = df_crashes.merge(df_vehicles,how='left', on='crash_record_id', suffixes=('_left', '_right'))  \n",
    "    return df "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "f00a82c3",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Rename Columns\n",
    "def rename_columns(df):\n",
    "    vehicle_mapping = {'vehicle_type' :  'vehicletypes'}  \n",
    "    df = df.rename(columns=vehicle_mapping)\n",
    "    return df"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "73f8953d",
   "metadata": {},
   "source": [
    "#### Define the Pipeline Functions to run the Cleansing and Transformation Functions"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "20248c12",
   "metadata": {},
   "outputs": [],
   "source": [
    "def read_data_pipeline(crash_file, vehicle_file): \n",
    "    df_crash = pd.DataFrame() \n",
    "    df_vehicle_crash = pd.DataFrame() \n",
    "    try: \n",
    "        df_crash = read_datasources(crash_file) \n",
    "        df_vehicle = read_datasources(vehicle_file) \n",
    "    except Exception as e: \n",
    "        logging.info(\"Exception in reading data pipeline\") \n",
    "    finally: \n",
    "        return df_crash, df_vehicle   "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "806ab642",
   "metadata": {},
   "outputs": [],
   "source": [
    "def drop_rows_with_null_values_pipeline(df_crash, df_vehicle): \n",
    "    try: \n",
    "        df_crash = drop_rows_with_null_values(df_crash) \n",
    "        df_vehicle = drop_rows_with_null_values(df_vehicle) \n",
    "    except Exception as e: \n",
    "        logging.info(\"Exception in dropping rows with null value data pipeline\") \n",
    " \n",
    "    finally: \n",
    "        return df_crash, df_vehicle"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "c98c4e5b",
   "metadata": {},
   "outputs": [],
   "source": [
    "def fill_missing_values_pipeline(df_crash, df_vehicle): \n",
    "    try: \n",
    "        df_crash = fill_missing_values(df_crash) \n",
    "        df_vehicle_crash = fill_missing_values(df_vehicle) \n",
    "    except Exception as e: \n",
    "        logging.info(\"Exception in filling missing value pipeline\") \n",
    " \n",
    "    finally: \n",
    "        return df_crash, df_vehicle"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "e4fc80c0",
   "metadata": {},
   "outputs": [],
   "source": [
    "def merge_dataframes_pipeline(df_crash, df_vehicle): \n",
    "    try: \n",
    "        df_agg = merge_dataframes(df_vehicles,df_crashes)\n",
    "    except Exception as e: \n",
    "        logging.info(\"Exception in merge dataframes pipeline\") \n",
    " \n",
    "    finally: \n",
    "        return df_agg"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "d36bfdd4",
   "metadata": {},
   "outputs": [],
   "source": [
    "def format_dataframes_pipeline(df_agg): \n",
    "    try: \n",
    "        df_output = rename_columns(df_agg)\n",
    "    except Exception as e: \n",
    "        logging.info(\"Exception in renaming dataframe columns pipeline\") \n",
    " \n",
    "    finally: \n",
    "        return df_output"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "891dc164",
   "metadata": {},
   "source": [
    "#### Use the Chigaco Traffic Data and Run the Pipeline Workflow"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "2d3e2758",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Define input data \n",
    "crash_data_file = \"traffic_crashes.csv\" \n",
    "vehicle_crash_data_file = \"traffic_crash_vehicle.csv\" \n",
    "\n",
    "# Read Data Pipeline\n",
    "df_crash, df_vehicle = read_data_pipeline(\"traffic_crashes.csv\", \"traffic_crash_vehicle.csv\")\n",
    "\n",
    "# Drop Nulls\n",
    "df_crash, df_vehicle = drop_rows_with_null_values_pipeline(df_crash, df_vehicle) \n",
    "\n",
    "# Fill in Missing Values\n",
    "df_crash, df_vehicle = fill_missing_values_pipeline(df_crash, df_vehicle) \n",
    "\n",
    "# Merge Dataframes\n",
    "df_agg = merge_dataframes_pipeline(df_crash, df_vehicle)\n",
    "\n",
    "# Merge Dataframes\n",
    "df_output = format_dataframes_pipeline(df_agg)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "d9ac1fd9",
   "metadata": {},
   "outputs": [],
   "source": [
    "df_output.head()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "829e3d45",
   "metadata": {},
   "source": [
    "### Transformation Activities in Python "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "165c0b5e",
   "metadata": {},
   "outputs": [],
   "source": [
    "READING_CRASH_DATA_PIPELINE = \"<NOT_EXECUTED>\" \n",
    "DROPPING_ROW_WITH_NULL_PIPELINE = \"<NOT_EXECUTED>\" \n",
    "FILLING_MISSING_VALUE_PIPELINE = \"<NOT_EXECUTED>\" \n",
    "MERGE_DATAFRAME_PIPELINE = \"<NOT_EXECUTED>\" "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "00cd6a81",
   "metadata": {},
   "outputs": [],
   "source": [
    "df_crash, df_vehicle = read_data_pipeline(\"traffic_crashes.csv\", \"traffic_crash_vehicle.csv\") \n",
    " \n",
    "if READING_CRASH_DATA_PIPELINE == \"<OK>\": \n",
    "    df_crash, df_vehicle = drop_rows_with_null_values_pipeline(df_crash, df_vehicle) \n",
    " \n",
    "elif DROPPING_ROW_WITH_NULL_PIPELINE == \"<OK>\": \n",
    "    df_crash, df_vehicle= fill_missing_values_pipeline(df_crash, df_vehicle) \n",
    " \n",
    "elif FILLING_MISSING_VALUE_PIPELINE == \"<OK>\": \n",
    "    df_crash, df_vehicle = merge_dataframes_pipeline(df_crash, df_vehicle_crash) "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "14dbf8f4",
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.9.12"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
